前言
前面我们聊完了Mpsc,在提一下,Mpsc主要是针对的单消费者多生产者的情况。
对于消费者而言,因为只有一个消费者,所以不需要任何同步。
对于生产者而言,为了防止多线程下会出现问题,所以使用CAS操作。
但是上一篇文章中Mpsc使用的是链表的结构,不加以控制容易OOM。
为了避免这个问题,我们可以使用数组来作为底层存储。
原理其实和这边文章的RingBuffer讲的类似。
这里的RingBuffer其实就是Disruptor的实现,不过我单独抽成了一个文章来讲Disruptor的源码。
这里就划一些示意图,解释RingBuffer的原理。
位置的二阶段写入
想象一下,如果我们有一个数组,我们要添加一个元素,单线程写入
1 | public void put(T obj) { |
这样当然是没有问题的,如果是多线程呢,这样就会出现问题,因为Index++不是原子操作。
可能两个线程进入后,写入的其实是同一个位置的元素。
那怎么办呢?
我们可以简单的加锁
1 | public synchronized void put(T obj) { |
但是这样性能就会下降很多。
有没有其他的办法,就要借助简单的CAS就可以实现呢?
前面提到,问题在于Index++不是原子的操作,那么我们将Index++这个操作,变成原子的不就行了。
1 | public void put(T obj) { |
这样就行了。
这里我们把操作分为了两步
- 第一步,占位,这个操作是原子的
- 第二步,写入元素
RingBuffer的写入
有了上述的两阶段写入的基础,对于RingBuffer的写入就好理解了。
假设我们的写入偏移指针指向arr[1]
。
这个时候,有一个Producer想要写入一个数据咋办。
我们CAS这个Cursor,获取下一个位置的写入权限
伪代码如下:
1 | public int nextPosition() { |
获取成功后,往这个位置写入数据就行。
如果这个生产者同时有N个元素需要写入,那我们就直接申请下面N个位置的权限,然后依次写入就行。
代码上只要修改CAS的第三个参数就行。
所以,对于写入而言,多线程写入,只要CAS这个写入的偏移指针,先获取写入的位置信息,下面再塞入数据,可以极大的避免Lock。
如图所示,显示有2个线程同时在写入数据。
注意,这里存在一种状态,数据还未被完全写入成功,Write Cursor之前的格子里并不是全部都有数据了。
RingBuffer读取
对于RingBuffer的读取,其实也比较简单,也是一个二阶段的读取。
先对readIndex,进行CAS的加n。
成功后,读取返回的值的Position的值。
但是这里注意一个问题:
- 返回的Position位置的值可能还未被写入,所以读取的可能是个空。
怎么解决这个问题呢?
其实也不算是个问题,反复读取几遍,直到不为空就行。
但是在Disruptor中,使用了完全不同的做法,和我的方法略有不同。在下一篇文章中会讲。
ReadIndex和WriteIndex的冲突
从上面的文章,我们可以理解为没有冲突,也就是这个数组的长度是无限长的。
所以我们仅仅维护了一个WriteIndex。
但是在实际的情况中,数组的长度都固定的,到了末尾之后就要从头开始写。
所以这里仅仅维护一个WriteIndex是不够的,还需要维护一个ReadIndex。
使用这两个Index来判断数组是否是空的或者已经满了。